O k-Nearest Neighbors (ou KNN) é uma técnica de classificação bem simples que consiste em prever uma classe alvo ao encontrar a(s) classe(s) vizinha(s) mais próxima(s).
Neste tutorial iremos apresentar a implementação do algoritmo k-Nearest Neighbors. Usaremos como exemplo o conjunto de dados Abalone, que são um tipo de moluscos gastrópodes. Veja neste link.
Este problema deseja predizer a idade de um abalone através de medidas físicas. A idade de um abalone é determinada cortando a concha através do cone, manchando-a e contando o número de anéis através de um microscópio - uma tarefa chata e demorada. Outra medidas que são mais facéis de obter, são usadas para prever a idade, como padrões climáticos e localização (portanto, disponibilidade de alimentos).
1.1 Carregar arquivo
A primeira coisa que precisamos fazer é carregar nosso arquivo de dados. Os dados estão no formato CSV sem linha de cabeçalho. Podemos abrir o arquivo com a função open e ler as linhas de dados usando a função de leitor no módulo csv.
Também precisamos converter os atributos que foram carregados como strings em números para que possamos trabalhar com eles. Abaixo está a função load_csv( ) para carregar o conjunto de dados Abalone.
In [6]:
from csv import reader
from math import sqrt
# carregar um arquivo csv
def load_csv(filename):
dataset = list()
with open(filename, 'r') as file:
csv_reader = reader(file)
for row in csv_reader:
if not row:
continue
dataset.append(row)
return dataset
# Carregando dataset Abalone
filename = 'abalone.csv'
dataset = load_csv(filename)
print ('Arquivo de dados {0} carregado com {1} linhas e {2} colunas'.format(filename, len(dataset), len(dataset[0])))
1.2 Converter Valores Não-numéricos
É importante que todos os valores sejam numéricos para que possamos caclular as distâncias euclidianas
Abaixo funções que convertem string para float ou string para int
In [7]:
# Converte string para float
def str_column_to_float(dataset, column):
for row in dataset:
row[column] = float(row[column].strip())
# Converte string para int
def str_column_to_int(dataset, column):
class_values = [row[column] for row in dataset]
unique = set(class_values)
lookup = dict()
for i, value in enumerate(unique):
lookup[value] = i
for row in dataset:
row[column] = lookup[row[column]]
return lookup
1.3 Normalizar
Algumas colunas têm variância maior que outras, por isso é importante normalizar todas as colunas. Para isso calcularemos primeiro os mínimos e máximos.
In [8]:
def dataset_minmax(dataset):
minmax = list()
for i in range(len(dataset[0])):
col_values = [row[i] for row in dataset]
value_min = min(col_values)
value_max = max(col_values)
minmax.append([value_min, value_max])
return minmax
def normalize_dataset(dataset, minmax):
for row in dataset:
for i in range(len(row)):
row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])
O primeiro passo necessário é calcular a distância entre duas linhas em um conjunto de dados. Linhas de dados são constituídos principalmente por números e uma maneira fácil de calcular a distância entre duas linhas ou vetores de números é desenhar uma linha reta. Isso faz sentido em 2D ou 3D e escala muito bem para maiores dimensões. Podemos calcular a distância da linha reta entre dois vetores usando a medida de distância euclidiana. É calculado como a raiz quadrada da soma do quadrado diferenças entre os dois vetores. Com a distância euclidiana, quanto menos o valor, maior a similaridade. O valor $0$ significa que não há nenhuma diferença entre dois registros.
Abaixo temos a função que calcula isso:
In [15]:
from math import sqrt
def euclidean_distance(row1, row2):
distance = 0.0
for i in range(len(row1)-1):
distance += (row1[i] - row2[i])**2
return sqrt(distance)
In [17]:
# Teste da função euclidean_distance
dataset = [[2.7810836,2.550537003,0],
[1.465489372,2.362125076,0],
[3.396561688,4.400293529,0],
[1.38807019,1.850220317,0],
[3.06407232,3.005305973,0],
[7.627531214,2.759262235,1],
[5.332441248,2.088626775,1],
[6.922596716,1.77106367,1],
[8.675418651,-0.242068655,1],
[7.673756466,3.508563011,1]]
row0 = dataset[0]
for row in dataset:
distance = euclidean_distance(row0, row)
print(distance)
Os vizinhos para um novo dado no conjunto de dados são as k instâncias mais próximas. Para localizar os vizinhos para um novo dado dentro de um conjunto de dados, devemos primeiro calcular a distância entre cada registro no conjunto de dados para o novo dado. Nós podemos fazer isso usando nossa função de distância acima. Uma vez que as distâncias são calculadas, devemos ordenar todas os registros no conjunto de dados de treinamento por sua distância entre o novo dado. Podemos então selecionar o top k para retornar como os vizinhos mais parecidos.
Podemos fazer isso guardando a distância de cada registro no conjunto de dados como uma tupla, ordenar a lista de tuplas pela distância(em ordem decrescente) e depois recuperar os vizinhos.
Abaixo está uma função chamada get_neighbors( ) que implementa isso.
In [19]:
def get_neighbors(train, test_row, num_neighbors):
distances = list()
for train_row in train:
dist = euclidean_distance(test_row, train_row)
distances.append((train_row, dist))
neighbors = list()
for i in range(num_neighbors):
neighbors.append(distances[i][0])
return neighbors
In [21]:
# Teste da função get_neighbors
dataset = [[2.7810836,2.550537003,0],
[1.465489372,2.362125076,0],
[3.396561688,4.400293529,0],
[1.38807019,1.850220317,0],
[3.06407232,3.005305973,0],
[7.627531214,2.759262235,1],
[5.332441248,2.088626775,1],
[6.922596716,1.77106367,1],
[8.675418651,-0.242068655,1],
[7.673756466,3.508563011,1]]
neighbors = get_neighbors(dataset, dataset[0], 3)
for neighbor in neighbors:
print(neighbor)
Os vizinhos mais semelhantes coletados do conjunto de dados de treinamento podem ser usados para fazer previsões. No caso da classificação, podemos retornar a classe mais representada entre os vizinhos. Podemos conseguir isso executando a função max( ) na lista de valores de saída dos vizinhos. Dada uma lista de valores de classe observadas nos vizinhos, a função max( ) toma um conjunto de valores de classe únicos e chama a contagem na lista de valores de classe para cada valor de classe no conjunto.
Abaixo está a função denominada predict_classification( ) que implementa isso.
In [22]:
def predict_classification(train, test_row, num_neighbors):
neighbors = get_neighbors(train, test_row, num_neighbors)
output_values = [row[-1] for row in neighbors]
prediction = max(set(output_values), key=output_values.count)
return prediction
In [23]:
# Teste da função predict_classification
dataset = [[2.7810836,2.550537003,0],
[1.465489372,2.362125076,0],
[3.396561688,4.400293529,0],
[1.38807019,1.850220317,0],
[3.06407232,3.005305973,0],
[7.627531214,2.759262235,1],
[5.332441248,2.088626775,1],
[6.922596716,1.77106367,1],
[8.675418651,-0.242068655,1],
[7.673756466,3.508563011,1]]
prediction = predict_classification(dataset, dataset[0], 3)
print( ' Expected %d, Got %d. ' % (dataset[0][-1], prediction))
In [25]:
def accuracy_metric(actual, predicted):
correct = 0
for i in range(len(actual)):
if actual[i] == predicted[i]:
correct += 1
return correct / float(len(actual)) * 100.0
5.2 Avaliar Algoritmo
Abaixo temos a função que avalia o algoritmo usando uma divisão de validação cruzada.
In [24]:
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
folds = cross_validation_split(dataset, n_folds)
scores = list()
for fold in folds:
train_set = list(folds)
train_set.remove(fold)
train_set = sum(train_set, [])
test_set = list()
for row in fold:
row_copy = list(row)
test_set.append(row_copy)
row_copy[-1] = None
predicted = algorithm(train_set, test_set, *args)
actual = [row[-1] for row in fold]
accuracy = accuracy_metric(actual, predicted)
scores.append(accuracy)
return scores
In [27]:
from random import seed
from random import randrange
from csv import reader
from math import sqrt
# Carregar um arquivo CSV
def load_csv(filename):
dataset = list()
with open(filename, 'r') as file:
csv_reader = reader(file)
for row in csv_reader:
if not row:
continue
dataset.append(row)
return dataset
# Converter string para float
def str_column_to_float(dataset, column):
for row in dataset:
row[column] = float(row[column].strip())
# Converter string para integer
def str_column_to_int(dataset, column):
class_values = [row[column] for row in dataset]
unique = set(class_values)
lookup = dict()
for i, value in enumerate(unique):
lookup[value] = i
for row in dataset:
row[column] = lookup[row[column]]
return lookup
# Achar os maximos e minimos de cada coluna
def dataset_minmax(dataset):
minmax = list()
for i in range(len(dataset[0])):
col_values = [row[i] for row in dataset]
value_min = min(col_values)
value_max = max(col_values)
minmax.append([value_min, value_max])
return minmax
# Normalizar o conjunto de dados
def normalize_dataset(dataset, minmax):
for row in dataset:
for i in range(len(row)):
row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])
# Dividir o conjunto de dados em k folds
def cross_validation_split(dataset, n_folds):
dataset_split = list()
dataset_copy = list(dataset)
fold_size = int(len(dataset) / n_folds)
for i in range(n_folds):
fold = list()
while len(fold) < fold_size:
index = randrange(len(dataset_copy))
fold.append(dataset_copy.pop(index))
dataset_split.append(fold)
return dataset_split
# Calcular porcentagem de precisao
def accuracy_metric(actual, predicted):
correct = 0
for i in range(len(actual)):
if actual[i] == predicted[i]:
correct += 1
return correct / float(len(actual)) * 100.0
# Avaliar o algoritmo usando a validação cruzada
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
folds = cross_validation_split(dataset, n_folds)
scores = list()
for fold in folds:
train_set = list(folds)
train_set.remove(fold)
train_set = sum(train_set, [])
test_set = list()
for row in fold:
row_copy = list(row)
test_set.append(row_copy)
row_copy[-1] = None
predicted = algorithm(train_set, test_set, *args)
actual = [row[-1] for row in fold]
accuracy = accuracy_metric(actual, predicted)
scores.append(accuracy)
return scores
# Calcular a distância euclidiana entre dois vetores
def euclidean_distance(row1, row2):
distance = 0.0
for i in range(len(row1)-1):
distance += (row1[i] - row2[i])**2
return sqrt(distance)
# Achar os vizinhos mais proximos
def get_neighbors(train, test_row, num_neighbors):
distances = list()
for train_row in train:
dist = euclidean_distance(test_row, train_row)
distances.append((train_row, dist))
distances.sort(key=lambda tup: tup[1])
neighbors = list()
for i in range(num_neighbors):
neighbors.append(distances[i][0])
return neighbors
# Fazer a predição com os vizinhos
def predict_classification(train, test_row, num_neighbors):
neighbors = get_neighbors(train, test_row, num_neighbors)
output_values = [row[-1] for row in neighbors]
prediction = max(set(output_values), key=output_values.count)
return prediction
# Algoritmo kNN
def k_nearest_neighbors(train, test, num_neighbors):
predictions = list()
for row in test:
output = predict_classification(train, row, num_neighbors)
predictions.append(output)
return(predictions)
# Teste
seed(1)
# Carregar e preparar o conjunto de dados
filename = 'abalone.csv'
dataset = load_csv(filename)
for i in range(1, len(dataset[0])):
str_column_to_float(dataset, i)
# Converter primeira conluna para integer
str_column_to_int(dataset, 0)
# Avaliar o algoritmo
n_folds = 5
num_neighbors = 5
scores = evaluate_algorithm(dataset, k_nearest_neighbors, n_folds, num_neighbors)
print('Scores: %s' % scores)
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores))))
Um valor de $k = 5$ vizinhos foi usado para fazer previsões. Você pode experimentar com $k$ maiores para aumentar a precisão. Podemos ver que a precisão média de 23% é melhor do que a linha de base de 16%, mas é bastante pobre em geral. Isto é devido ao grande número de classes tornando a precisão um "pobre juiz" de habilidade sobre esse problema. Esse fato, combinado com o fato de que Muitas das classes têm poucos ou um exemplo também tornam o problema desafiador.
Nós também podemos modelar o conjunto de dados como um problema de modelagem preditiva de regressão. Isto é porque os valores da classe têm uma relação ordinal natural.
A regressão pode ser uma maneira mais útil de modelar este problema, dado o grande número de classes e dispersão de alguns valores de classe. Podemos facilmente mudar nosso exemplo acima para regressão por alterando KNN para prever a média dos vizinhos e usando Root Mean Squared Error(erro quadrado médio) para avaliar previsões. Abaixo está um exemplo completo com essas mudanças.
In [28]:
# k-Nearest Neighbors on the Abalone Dataset for Regression
from random import seed
from random import randrange
from csv import reader
from math import sqrt
# Carregar um arquivo CSV
def load_csv(filename):
dataset = list()
with open(filename, 'r') as file:
csv_reader = reader(file)
for row in csv_reader:
if not row:
continue
dataset.append(row)
return dataset
# Converter string para float
def str_column_to_float(dataset, column):
for row in dataset:
row[column] = float(row[column].strip())
# Converter string para integer
def str_column_to_int(dataset, column):
class_values = [row[column] for row in dataset]
unique = set(class_values)
lookup = dict()
for i, value in enumerate(unique):
lookup[value] = i
for row in dataset:
row[column] = lookup[row[column]]
return lookup
# Achar os maximos e minimos de cada coluna
def dataset_minmax(dataset):
minmax = list()
for i in range(len(dataset[0])):
col_values = [row[i] for row in dataset]
value_min = min(col_values)
value_max = max(col_values)
minmax.append([value_min, value_max])
return minmax
# Normalizar o conjunto de dados
def normalize_dataset(dataset, minmax):
for row in dataset:
for i in range(len(row)):
row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])
# Dividr o conjunto de dados em k folds
def cross_validation_split(dataset, n_folds):
dataset_split = list()
dataset_copy = list(dataset)
fold_size = int(len(dataset) / n_folds)
for i in range(n_folds):
fold = list()
while len(fold) < fold_size:
index = randrange(len(dataset_copy))
fold.append(dataset_copy.pop(index))
dataset_split.append(fold)
return dataset_split
# Calcular erro medio
def rmse_metric(actual, predicted):
sum_error = 0.0
for i in range(len(actual)):
prediction_error = predicted[i] - actual[i]
sum_error += (prediction_error ** 2)
mean_error = sum_error / float(len(actual))
return sqrt(mean_error)
# Avaliar o algoritmo usando a validacao cruzada
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
folds = cross_validation_split(dataset, n_folds)
scores = list()
for fold in folds:
train_set = list(folds)
train_set.remove(fold)
train_set = sum(train_set, [])
test_set = list()
for row in fold:
row_copy = list(row)
test_set.append(row_copy)
row_copy[-1] = None
predicted = algorithm(train_set, test_set, *args)
actual = [row[-1] for row in fold]
rmse = rmse_metric(actual, predicted)
scores.append(rmse)
return scores
# Calcular a distância euclidiana entre dois vetores
def euclidean_distance(row1, row2):
distance = 0.0
for i in range(len(row1)-1):
distance += (row1[i] - row2[i])**2
return sqrt(distance)
# Achar os vizinhos mais proximos
def get_neighbors(train, test_row, num_neighbors):
distances = list()
for train_row in train:
dist = euclidean_distance(test_row, train_row)
distances.append((train_row, dist))
distances.sort(key=lambda tup: tup[1])
neighbors = list()
for i in range(num_neighbors):
neighbors.append(distances[i][0])
return neighbors
# Fazer a predição com os vizinhos
def predict_regression(train, test_row, num_neighbors):
neighbors = get_neighbors(train, test_row, num_neighbors)
output_values = [row[-1] for row in neighbors]
prediction = sum(output_values) / float(len(output_values))
return prediction
# Algoritmo kNN
def k_nearest_neighbors(train, test, num_neighbors):
predictions = list()
for row in test:
output = predict_regression(train, row, num_neighbors)
predictions.append(output)
return(predictions)
# Teste
seed(1)
# Carregar e preparar o conjunto de dados
filename = 'abalone.csv'
dataset = load_csv(filename)
for i in range(1, len(dataset[0])):
str_column_to_float(dataset, i)
# converter primeira conluna para integer
str_column_to_int(dataset, 0)
# avaliar o algoritmo
n_folds = 5
num_neighbors = 5
scores = evaluate_algorithm(dataset, k_nearest_neighbors, n_folds, num_neighbors)
print('Scores: %s' % scores)
print('Mean RMSE: %.3f' % (sum(scores)/float(len(scores))))
Ao executar este exemplo, imprime o RMSE em cada fold e o RMSE médio em todas os folds. Podemos ver que o RMSE de 2.242 anéis é melhor do que a linha de base de 3.222 anéis. Nós também temos um modelo que talvez seja mais útil no domínioe e com um desempenho mais fácil de compreender.